{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Execute this cell to install dependencies\n",
    "%pip install sf-hamilton[visualization] dlt[duckdb]>=0.3.12 ibis-framework[duckdb] openai pandas polars"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Slack Summaries [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/examples/dlt/notebook.ipynb) [![GitHub badge](https://img.shields.io/badge/github-view_source-2b3137?logo=github)](https://github.com/apache/hamilton/blob/main/examples/dlt/notebook.ipynb)\n",
    "\n",
    "This notebook shows how to ingest Slack messages and generate threads summaries.\n",
    "\n",
    "NOTE. This notebook uses data generated on a dummy Slack server, so messages, replies, and summaries aren't really insightful.\n",
    "\n",
    "## Pre-requisites\n",
    "The first step is to `dlt init` the Slack pipeline, but this is already done for this repository. All you need to do is configure your Slack application to get a User OAuth Token (see [docs](https://dlthub.com/docs/dlt-ecosystem/verified-sources/slack)). Then, you can it add it to `.dlt/secrets.toml` as follow:\n",
    "\n",
    "```toml\n",
    "[sources.slack]\n",
    "access_token = \"xoxb-...\"\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Extract & Load (dlt)\n",
    "The \"extract\" and \"load\" steps consist of getting the raw data to destination for further transformations.\n",
    "\n",
    "### Set up\n",
    "Define your dlt `Pipeline` and `Source`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dlt\n",
    "import slack\n",
    "\n",
    "slack_pipeline = dlt.pipeline(\n",
    "    pipeline_name=\"slack\",\n",
    "    destination='duckdb',\n",
    "    dataset_name=\"slack_data\",\n",
    "    full_refresh=True,\n",
    ")\n",
    "\n",
    "dlt_source = slack.slack_source(\n",
    "    selected_channels=[\"general\", \"dlt\"],\n",
    "    replies=True,\n",
    ")\n",
    "\n",
    "print(f\"\"\"{slack_pipeline.dataset_name=:}\n",
    "{slack_pipeline.pipeline_name=:}\"\"\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Execution\n",
    "Run your dlt pipeline. `load_info` will contain a lot metadata about execution."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "load_info = slack_pipeline.run(dlt_source)\n",
    "print(load_info)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Transform (Hamilton + Ibis)\n",
    "\n",
    "Now that raw data is loaded (in DuckDB), we use Hamilton to organize a dataflow of Ibis data transformations.  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define\n",
    "\n",
    "In the next cells, we use Python functions to define a Hamilton dataflow. \n",
    "\n",
    "By using the Hamilton Jupyter plugin, we can define the dataflow interactively. Adding and removing functions to the cell will change its shape. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%load_ext hamilton.plugins.jupyter_magic\n",
    "from IPython.display import display"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%cell_to_module -m jupyter_transform -d\n",
    "\n",
    "import textwrap\n",
    "\n",
    "import dlt\n",
    "import ibis\n",
    "import ibis.expr.types as ir\n",
    "import openai\n",
    "from hamilton.function_modifiers import pipe, source, step\n",
    "from hamilton.htypes import Parallelizable, Collect\n",
    "\n",
    "\n",
    "def db_con(pipeline: dlt.Pipeline) -> ibis.BaseBackend:\n",
    "    backend = ibis.connect(f\"{pipeline.pipeline_name}.duckdb\")\n",
    "    ibis.set_backend(backend)\n",
    "    return backend\n",
    "\n",
    "\n",
    "def channel(selected_channels: list[str]) -> Parallelizable[str]:\n",
    "    for channel in selected_channels:\n",
    "        yield channel\n",
    "\n",
    "\n",
    "def _epoch_microseconds(timestamp: ir.Column) -> ir.Column:\n",
    "    seconds_from_epoch = timestamp.epoch_seconds()\n",
    "    microseconds = timestamp.microsecond() / int(10e5)\n",
    "    return seconds_from_epoch + microseconds\n",
    "    \n",
    "\n",
    "def channel_message(\n",
    "    channel: str,\n",
    "    db_con: ibis.BaseBackend, \n",
    "    pipeline: dlt.Pipeline,\n",
    ") -> ir.Table:\n",
    "    \"\"\"Load table containing parent messages of a channel.\n",
    "    the timestamps `thread_ts` and `ts` are converted to strings.\n",
    "    `thread_ts` is not None if the message has replies / started a thread. Otherwise,\n",
    "    `thread_ts` == `ts`. Coalesce is used to fill these None values with `ts`\n",
    "\n",
    "    Slack reference: https://api.slack.com/messaging/retrieving#finding_threads\n",
    "    \"\"\"\n",
    "    return (\n",
    "        db_con.table(\n",
    "            f\"{channel}_message\",\n",
    "            schema=pipeline.dataset_name,\n",
    "            database=pipeline.pipeline_name,\n",
    "        )\n",
    "        .mutate(\n",
    "            thread_ts=_epoch_microseconds(ibis._.thread_ts).cast(str),\n",
    "            ts=_epoch_microseconds(ibis._.ts).cast(str),\n",
    "        )\n",
    "        .mutate(thread_ts=ibis.coalesce(ibis._.thread_ts, ibis._.ts))\n",
    "    )\n",
    "\n",
    "\n",
    "def channel_replies(\n",
    "    channel: str,\n",
    "    db_con: ibis.BaseBackend, \n",
    "    pipeline: dlt.Pipeline,\n",
    ") -> ir.Table:\n",
    "    \"\"\"Create table for replies\"\"\"\n",
    "    return db_con.table(\n",
    "        f\"{channel}_replies_message\",\n",
    "        schema=pipeline.dataset_name,\n",
    "        database=pipeline.pipeline_name,\n",
    "    )\n",
    "    \n",
    "\n",
    "def channel_threads(channel_message: ir.Table, channel_replies: ir.Table) -> ir.Table:\n",
    "    \"\"\"Union of parent messages and replies. Sort by thread start, then message timestamp\"\"\"\n",
    "    columns = [\"channel\", \"thread_ts\", \"ts\", \"user\", \"text\", \"_dlt_load_id\", \"_dlt_id\"]\n",
    "    return (\n",
    "        ibis.union(\n",
    "            channel_message.select(columns),\n",
    "            channel_replies.select(columns),\n",
    "        )\n",
    "        .order_by([ibis._.thread_ts, ibis._.ts])\n",
    "    )\n",
    "\n",
    "\n",
    "def channels_collection(channel_threads: Collect[ir.Table]) -> ir.Table:\n",
    "    \"\"\"Collect `channel_threads` for all channels\"\"\"\n",
    "    return ibis.union(*channel_threads)\n",
    "\n",
    "\n",
    "def _format_messages(threads: ir.Table) -> ir.Table:\n",
    "    \"\"\"Assign a user id per thread and prefix messages with it\"\"\"\n",
    "    thread_user_id_expr = (ibis.dense_rank().over(order_by=\"user\") + 1).cast(str)\n",
    "    return threads.group_by(\"thread_ts\").mutate(\n",
    "        message=thread_user_id_expr.concat(\": \", ibis._.text)\n",
    "    )\n",
    "\n",
    "\n",
    "def _aggregate_thread(threads: ir.Table) -> ir.Table:\n",
    "    \"\"\"Create threads as a single string by concatenating messages\n",
    "\n",
    "    Functions decorates with `@ibis.udf` are loaded by the Ibis backend.\n",
    "    They aren't meant to be called directly.\n",
    "    ref: https://ibis-project.org/how-to/extending/builtin\n",
    "    \"\"\"\n",
    "\n",
    "    @ibis.udf.agg.builtin(name=\"string_agg\")\n",
    "    def _string_agg(arg, sep: str = \"\\n \") -> str:\n",
    "        raise NotImplementedError\n",
    "\n",
    "    @ibis.udf.agg.builtin(name=\"array_agg\")\n",
    "    def _array_agg(arg) -> list[str]:\n",
    "        raise NotImplementedError\n",
    "\n",
    "    return threads.group_by(\"thread_ts\").agg(\n",
    "        thread=_string_agg(ibis._.message),\n",
    "        num_messages=ibis._.count(),\n",
    "        users=_array_agg(ibis._.user).unique(),\n",
    "        _dlt_load_id=ibis._._dlt_load_id.max(),\n",
    "        _dlt_id=_array_agg(ibis._._dlt_id),\n",
    "    )\n",
    "\n",
    "\n",
    "def summary_prompt() -> str:\n",
    "    \"\"\"LLM prompt to summarize Slack thread\"\"\"\n",
    "    return textwrap.dedent(\n",
    "        \"\"\"Hamilton is an open source library to write dataflows in Python. It is used by developers for data engineering, data science, machine learning, and LLM workflows.\n",
    "        Next is a discussion thread about Hamilton started by User1. Complete these tasks: identify the issue raised by User1, summarize the discussion, indicate if you think the issue was resolved.\n",
    "\n",
    "        DISCUSSION THREAD\n",
    "        {text}\n",
    "        \"\"\"\n",
    "    )\n",
    "\n",
    "def _summary(threads: ir.Table, prompt: str) -> ir.Table:\n",
    "    \"\"\"Generate a summary for each thread.\n",
    "    Uses a scalar Python UDF executed by the backend.\n",
    "    \"\"\"\n",
    "    # Ibis requires `str` type hint even if None is allowed\n",
    "    @ibis.udf.scalar.python\n",
    "    def _openai_completion_udf(text: str, prompt_template: str) -> str:\n",
    "        \"\"\"Fill `prompt` with `text` and use OpenAI chat completion.\n",
    "        Returns None if:\n",
    "            - `text` is empty\n",
    "            - `content` is too long\n",
    "            - OpenAI call fails\n",
    "        \"\"\"\n",
    "        if len(text) == 0:\n",
    "            return None\n",
    "\n",
    "        content = prompt_template.format(text=text)\n",
    "\n",
    "        if len(content) // 4 > 8191:\n",
    "            return None\n",
    "\n",
    "        client = openai.OpenAI()\n",
    "\n",
    "        response = client.chat.completions.create(\n",
    "            model=\"gpt-3.5-turbo\",\n",
    "            messages=[{\"role\": \"user\", \"content\": content}],\n",
    "        )\n",
    "        try:\n",
    "            output = response.choices[0].message.content\n",
    "        except Exception:\n",
    "            output = None\n",
    "\n",
    "        return output\n",
    "    \n",
    "    return threads.mutate(summary=_openai_completion_udf(threads.thread, prompt))\n",
    "\n",
    "\n",
    "# @pipe operator facilitates managing the function/node namespace\n",
    "@pipe(\n",
    "    step(_format_messages), step(_aggregate_thread), step(_summary, prompt=source(\"summary_prompt\"))\n",
    ")\n",
    "def threads(channels_collection: ir.Table) -> ir.Table:\n",
    "    \"\"\"Create `threads` table by formatting, aggregating messages,\n",
    "    and generating summaries.\n",
    "    \"\"\"\n",
    "    return channels_collection\n",
    "\n",
    "\n",
    "def insert_threads(threads: ir.Table) -> int:\n",
    "    \"\"\"Save `threads` table and return row count.\"\"\"\n",
    "    db_con = ibis.get_backend()\n",
    "    threads_table = db_con.create_table(\"threads\", threads)\n",
    "    db_con.insert(\"threads\", threads)\n",
    "    return int(threads_table.count().execute())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Execute\n",
    "Create a Hamilton Driver with the defined dataflow module and request nodes to compute. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Driver definition\n",
    "\n",
    "We pass the module `jupyter_transform` that we defined in this notebook, but it could be modules stored in `.py` files. The `.enable_dynamic_execution()` statement is required because we're using Hamilton's `Parallelizable/Collect` feature."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from hamilton import driver\n",
    "import jupyter_transform\n",
    "\n",
    "dr = (\n",
    "    driver.Builder()\n",
    "    .enable_dynamic_execution(allow_experimental_mode=True)\n",
    "    .with_modules(jupyter_transform)\n",
    "    .build()\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Driver execution\n",
    "We create a dictionary to hold the inputs `pipeline` and `selected_channels` (the same passed to the dlt pipeline) required by the Hamilton dataflow.\n",
    "\n",
    "Then, we call `dr.execute()` with the list of nodes to compute and the inputs. This returns a dictionary of results."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "inputs = dict(\n",
    "    pipeline=slack_pipeline,\n",
    "    selected_channels=[\"general\", \"dlt\"],\n",
    ")  \n",
    "final_vars = [\"threads.with_aggregate_thread\"]\n",
    "\n",
    "results = dr.execute(final_vars, inputs=inputs)\n",
    "# `threads.with_aggregate_thread` is an ibis expr, execute it to return a pandas DataFrame\n",
    "df = results[\"threads.with_aggregate_thread\"].to_pandas()\n",
    "\n",
    "display(dr.visualize_execution(final_vars=final_vars, inputs=inputs), df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The next cells require your OpenAI key to execute the full dataflow and compute the summaries. By requesting `insert_threads`, we could directly save the Ibis results without leaving DuckDB!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os \n",
    "import getpass\n",
    "\n",
    "os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"Enter your OpenAI key\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "final_vars = [\"threads\"]   # replace by `\"insert_threads\"` to directly store results\n",
    "results = dr.execute(final_vars, inputs=inputs)\n",
    "df2 = results[\"threads\"].to_pandas()\n",
    "\n",
    "display(dr.visualize_execution(final_vars=final_vars, inputs=inputs), df2)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "venv",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
